package com.nx.arch.transactionmsg;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.nx.arch.transactionmsg.model.Msg;
import com.nx.arch.transactionmsg.model.State;
import com.nx.arch.transactionmsg.model.TxMsgDataSource;
import com.nx.arch.transactionmsg.utils.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @类名称 TransactionMsgClient.java
 * @类描述 事务消息客户端基类
 * @作者  庄梦蝶殇 linhuaichuan@naixuejiaoyu.com
 * @创建时间 2020年4月12日 下午4:19:47
 * @版本 1.0.0
 *
 * @修改记录
 * <pre>
 *     版本                       修改人 		修改日期 		 修改内容描述
 *     ----------------------------------------------
 *     1.0.0 		庄梦蝶殇 	2020年4月12日             
 *     ----------------------------------------------
 * </pre>
 */
public abstract class TransactionMsgClient {
    private static final String MQ_PRODUCER_NAME = "TransactionMsgProducer";
    
    private static final int MIN_DELAY = 0;
    
    private static final int MAX_DELAY = 90 * 24 * 60 * 60;
    
    protected static final Logger LOGGER = LoggerFactory.getLogger(TransactionMsgClient.class);
    
    private DefaultMQProducer producer;
    
    private String mqAddr;
    
    private List<TxMsgDataSource> dbDataSources;
    
    protected MsgProcessor msgProcessor;
    
    private MsgStorage msgStorage;
    
    private Config config;
    
    private AtomicReference<State> state;
    
    /**
     * 
     * @param mqAddr
     * @param config
     * @param topicLists 沙箱和线上区分，需要是主题名字不一样（沙箱和线上是同一个db）,如果为空，默认扫描当前msg表下面，所有主题
     * @param dbDataSources 数据源，用户名，密码，url Url需要和业务dataSource中配置的url一模一样
     */
    protected TransactionMsgClient(String mqAddr, List<TxMsgDataSource> dbDataSources, List<String> topicLists, Config config) {
        this.mqAddr = mqAddr;
        this.dbDataSources = dbDataSources;
        msgStorage = new MsgStorage(dbDataSources, topicLists);
        // producer group
        producer = new DefaultMQProducer(MQ_PRODUCER_NAME);
        // mq 地址
        producer.setNamesrvAddr(this.mqAddr);
        msgProcessor = new MsgProcessor(producer, msgStorage);
        this.config = config;
        state = new AtomicReference<State>(State.CREATE);
    }
    
    /**
     * @param content 事务消息内容
     * @param topic 主题
     * @param tag 
     * @return 事务消息插入库的id
     * @throws RuntimeException spring aop 事务默认配置，如果是抛出RuntimeException才会回滚，如果是Exception不会回滚
     */
    public abstract Long sendMsg(String content, String topic, String tag)
        throws Exception;
    
    /**
     * 初始化内部MQ Producer,mysql 连接池，线程池等
     * @throws MQClientException 
     */
    public void init()
        throws Exception {
        if (state.get().equals(State.RUNNING)) {
            LOGGER.info("TransactionMsgClient have inited, return");
            return;
        }
        LOGGER.info("start init mqAddr={} state {} this {}", this.mqAddr, state, this);
        producer.setSendMsgTimeout(config.getSendMsgTimeout());
        if (config == null) {
            config = new Config();
        }
        try {
            producer.start(); // 启动producer
            msgProcessor.init(this.config);
            msgStorage.init(this.config);
            LOGGER.info("end init success");
        } catch (Exception ex) {
            LOGGER.error("producer start fail", ex);
            throw ex;
        }
        state.compareAndSet(State.CREATE, State.RUNNING);
    }
    
    public void close() {
        LOGGER.info("start close TransactionMsgClient");
        if (state.compareAndSet(State.RUNNING, State.CLOSED)) {
            msgProcessor.close();
            msgStorage.close();
            producer.shutdown();
        } else {
            LOGGER.info("state not right {} ", state);
        }
    }
    
    /**
     * 新增事务消息
     * @param con 如果我们拿不到连接，需要暴露出来，让业务方set Connection
     * @param content
     * @param topic
     * @param tag
     * @return
     * @throws Exception
     */
    public Long sendMsg(Connection con, String content, String topic, String tag, int delay)
        throws Exception {
        Long id = null;
        if (!state.get().equals(State.RUNNING)) {
            LOGGER.error("TransactionMsgClient not Running , please call init function");
            throw new Exception("TransactionMsgClient not Running , please call init function");
        }
        if (content == null || content.isEmpty() || topic == null || topic.isEmpty()) {
            LOGGER.error("content or topic is null or empty");
            throw new Exception("content or topic is null or empty, notice ");
        }
        if (!msgStorage.isInTopicLists(topic)) {
            LOGGER.error("wan't to send msg in topic " + topic + " which is not in topicLists of config, can't resend if send failed");
            throw new Exception("wan't to send msg in topic " + topic + " which is not in topicLists of config, can't resend if send failed");
        }
        if (delay < MIN_DELAY || delay > MAX_DELAY) {
            LOGGER.error("delay can't <" + MIN_DELAY + " or > " + MAX_DELAY);
            throw new Exception("delay can't <" + MIN_DELAY + " or > " + MAX_DELAY);
        }
        
        try {
            LOGGER.debug("insert to msgTable topic {} tag {} Connection {} Autocommit {} ", topic, tag, con, con.getAutoCommit());
            if (con.getAutoCommit()) {
                LOGGER.error("***** attention not in transaction ***** topic {} tag {} Connection {} Autocommit {} ", topic, tag, con, con.getAutoCommit());
                throw new Exception("connection not in transaction con " + con);
            }
            Map.Entry<Long, String> idUrlPair = MsgStorage.insertMsg(con, content, topic, tag, delay);
            id = idUrlPair.getKey();
            Msg msg = new Msg(id, idUrlPair.getValue());
            msgProcessor.putMsg(msg);
        } catch (Exception ex) {
            LOGGER.error("sendMsg fail topic {} tag {} ", topic, tag, ex);
            throw ex;
        }
        return id;
    }
    
    public String getMqAddr() {
        return mqAddr;
    }
    
    public void setMqAddr(String mqAddr) {
        this.mqAddr = mqAddr;
    }
    
    public List<TxMsgDataSource> getDbDataSources() {
        return dbDataSources;
    }
    
    public void setDbDataSources(List<TxMsgDataSource> dbDataSources) {
        this.dbDataSources = dbDataSources;
    }
    
    public Config getConfig() {
        return config;
    }
    
    /**
     * 
     * @param config 连接池，线程，内部时间周期等参数
     */
    public void setConfig(Config config) {
        this.config = config;
    }
    
}
